Skip to content

Support per-task transactional leasing in loadTasks #1523

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

danielhumanmod
Copy link
Contributor

@danielhumanmod danielhumanmod commented May 4, 2025

Potentially as a improvement for Fix #774

Context

Introduce per-task transactional leasing in the metastore layer via loadTasks(...). This enables allows tasks to be leased and updated one at a time, avoiding the all-or-nothing semantics of bulk operations (which is also mentioned in TODO).

Motivation

  1. The current loadTasks fetches and updates a batch of tasks in a single transaction. In highly concurrent environments, if multiple executors attempt to lease overlapping tasks, write conflicts are more likely—especially with large batches. By switching to per-task transactions, we limit the scope of each transaction, reducing the chance of conflicts (but potentially at the cost of performance)
  2. This behavior was already suggested as a TODO in the original implementation, so it’s likely something we want to improve?

// TODO: Consider performing incremental leasing of individual tasks one at a time
// instead of requiring all-or-none semantics for all the tasks we think we listed,
// or else contention could be very bad.

  1. This change can serve as an optional enhancement to PR #1585. Happy to hear any feedback from the community :)

@collado-mike
Copy link
Contributor

Introduce per-task transactional leasing in the metastore layer via loadTasks(...). This enables fine-grained compensation by allowing tasks to be leased and updated one at a time, avoiding the all-or-nothing semantics of bulk operations (which is also mentioned in TODO). This is important for retry scenarios, where we want to isolate failures and ensure that tasks are independently retried without affecting each other.

I don't understand how this PR enables isolation of task failures. This PR only reads the tasks from the metastore one at a time, so the only failure would be in loading the task. In a transactional database, the UPDATE ... WHERE statement would only update the task state when the task is not currently leased by another client, so I don't see how one or a few tasks would fail to be leased while the others succeed.

The PR description sounds like it intends to tackle task execution failure - is that right? If so, loading the tasks from the database isn't going to solve that problem.

@eric-maynard
Copy link
Contributor

The PR description sounds like it intends to tackle task execution failure - is that right? If so, loading the tasks from the database isn't going to solve that problem.

I think it could, just very lazily right @collado-mike? The next time the service restarts, we could retry any orphaned tasks.

@danielhumanmod
Copy link
Contributor Author

danielhumanmod commented May 8, 2025

Introduce per-task transactional leasing in the metastore layer via loadTasks(...). This enables fine-grained compensation by allowing tasks to be leased and updated one at a time, avoiding the all-or-nothing semantics of bulk operations (which is also mentioned in TODO). This is important for retry scenarios, where we want to isolate failures and ensure that tasks are independently retried without affecting each other.

I don't understand how this PR enables isolation of task failures. This PR only reads the tasks from the metastore one at a time, so the only failure would be in loading the task. In a transactional database, the UPDATE ... WHERE statement would only update the task state when the task is not currently leased by another client, so I don't see how one or a few tasks would fail to be leased while the others succeed.

The PR description sounds like it intends to tackle task execution failure - is that right? If so, loading the tasks from the database isn't going to solve that problem.

Sorry for the confusion — we actually have a second PR for this feature. I try to split two parts to make review easier :)

This is the PR for second phase: #1585

Regarding this PR’s changes in the metastore, the goal is to allow each task entity to be read and leased individually. This ensures that if an exception occurs while reading or leasing one task, it won’t affect others. This improvement was also noted in the TODO comment of the previous implementation. It’s not strictly required, but maybe a “nice-to-have” one for isolating failures.

Update on May 17:
I have updated the PR's title and description to avoid confusion

Copy link
Collaborator

@adnanhemani adnanhemani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of things here: now with pagination being merged, this PR will require further revision to rebase properly imo.

I'm also in agreement with @collado-mike here - but to your response, I don't agree that this should be the way we solve this overall. I'm not sure I see high value in picking only one task at a time to solve this problem we have with tasks and retrying them. Instead, I'd advocate for leaning heavier on the definition of limit. If after we query the relevant amount of tasks using ms.listEntitiesInCurrentTxn and then find out some task has been modified between our querying of this task and attempting to commit our properties to the task, we should just filter it out of the resultant set and allow the user to receive all other tasks that were not impacted. If there are no tasks at the end of this filtering there are no tasks, then that would be the right place to throw the exception. Sure, we will not get the "limit" amount of tasks if the function returns - but I don't see a guarantee of needing that.

I know I've probably not researched this as deeply as you so WDYT?

@danielhumanmod danielhumanmod changed the title Support more reliable async task retry to guarantee eventual execution (1/2) – Metastore Layer Support per-task transactional leasing in loadTasks May 18, 2025
@danielhumanmod
Copy link
Contributor Author

danielhumanmod commented May 18, 2025

A couple of things here: now with pagination being merged, this PR will require further revision to rebase properly imo.

Thanks for the reminder — I’ll rebase and update it later!

I'm also in agreement with @collado-mike here - but to your response, I don't agree that this should be the way we solve this overall. I'm not sure I see high value in picking only one task at a time to solve this problem we have with tasks and retrying them. Instead, I'd advocate for leaning heavier on the definition of limit. If after we query the relevant amount of tasks using ms.listEntitiesInCurrentTxn and then find out some task has been modified between our querying of this task and attempting to commit our properties to the task, we should just filter it out of the resultant set and allow the user to receive all other tasks that were not impacted. If there are no tasks at the end of this filtering there are no tasks, then that would be the right place to throw the exception. Sure, we will not get the "limit" amount of tasks if the function returns - but I don't see a guarantee of needing that.

I know I've probably not researched this as deeply as you so WDYT?

That’s a great point — I actually considered that approach initially as well. That said, there were a couple of things that led me to explore the per-task leasing direction instead:

  1. The “skipping failed tasks” logic you described is indeed implemented in the AtomicOperationMetaStoreManager’s loadTasks method (code pointer).My assumption is that, under the atomic semantics of this class, partially leasing a subset of tasks is acceptable.
    However, in the case of TransactionWorkspaceMetaStoreManager, which seems to be designed around stricter transactional semantics, I wasn’t sure if silently skipping conflicted tasks would still be consistent with its intended guarantees.
  2. I also came across a TODO in the original implementation that seems to suggest exploring incremental per-task leasing.
// TODO: Consider performing incremental leasing of individual tasks one at a time
// instead of requiring all-or-none semantics for all the tasks we think we listed,
// or else contention could be very bad.

But I am not expert in Polaris's metastore, just sharing the context that led me to this approach. Would really appreciate any feedback or additional insight from the community

@adnanhemani
Copy link
Collaborator

I agree with the analysis you've stated too. I think it really comes down to point 1) that you made - and if someone has context as to whether they considered this approach before putting the TODO from point 2) down (and if so, why).

I, personally, don't think that the semantics between Transactional and Atomic forces us to make a different implementation here tbh - but would also like any other insight from the community here :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Task handling is incomplete
4 participants